package main

import (
	"errors"
	"syscall"
)

var ErrUnexpectedEOF = errors.New("unexpected EOF")

type Stream struct {
	Handle

	readCallback       ReadCallback
	connectionCallback ConnectionCallback

	io         io
	acceptedFd int
}

func (this *Stream) Init(l *Loop, t int) {
	this.Handle.Init(l, t)

	this.readCallback = nil
	this.connectionCallback = nil
	this.acceptedFd = -1

	this.io.init(this.streamIO, -1)
}

func (this *Stream) read() {
	var (
		nread int
		err   error
	)

	this.Flags &= ^uint32(HandleStatusReadPartial)
	for cnt := 32; this.readCallback != nil &&
		ISet(this.Flags, HandleStatusReading) &&
		cnt > 0; cnt-- {

		b := make([]byte, 64*1024)
		for {
			nread, err = syscall.Read(this.io.fd, b)
			if nread < 0 && err == syscall.EINTR {
				// retry
				continue
			}
			break
		}

		switch {
		case nread < 0:
			if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
				// wait for next
				if ISet(this.Flags, HandleStatusReading) {
					this.io.start(this.Loop, syscall.EPOLLIN)
				}
			} else {
				// error, should close
				this.readCallback(this, -1, nil)
				if ISet(this.Flags, HandleStatusReading) {
					this.Flags &= ^uint32(HandleStatusReading)
					this.io.stop(this.Loop, syscall.EPOLLIN)
					if !this.io.active(syscall.EPOLLOUT) {
						this.Handle.stop()
					}
				}
			}
			return

		case nread == 0:
			this.EOF()
			return

		default:
			nbuffer := len(b)
			this.readCallback(this, nread, b)

			/* Return if we didn't fill the buffer, there is no more data to read. */
			if nread < nbuffer {
				this.Flags |= HandleStatusReadPartial
				return
			}
		}
	}
}

func (this *Stream) EOF() {
	this.Flags |= HandleStatusReadEof
	this.Flags &= ^uint32(HandleStatusReading)
	this.io.stop(this.Loop, syscall.EPOLLIN)
	if !this.io.active(syscall.EPOLLOUT) {
		this.Handle.stop()
	}
	this.readCallback(this, 0, nil)
}

func (this *Stream) streamIO(l *Loop, events uint32) {
	/* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
	if ISet(events, syscall.EPOLLIN|syscall.EPOLLERR|syscall.EPOLLHUP) {
		this.read()
	}

	if this.Fd == -1 {
		return // read callback close stream
	}

	/* Short-circuit iff POLLHUP is set, the user is still interested in read
	 * events and read() reported a partial read but not EOF. If the EOF
	 * flag is set, read() called read callback with err EOF and we don't
	 * have to do anything. If the partial read flag is not set, we can't
	 * report the EOF yet because there is still data to read.
	 */

	if ISet(events, syscall.EPOLLHUP) &&
		ISet(this.Flags, HandleStatusReading) &&
		ISet(this.Flags, HandleStatusReadPartial) &&
		!ISet(this.Flags, HandleStatusReadEof) {
		this.EOF()
	}
}

func (this *Stream) Write(b []byte) (int, error) {
	var nn int
	l := len(b)
	for {
		n, err := syscall.Write(this.io.fd, b[nn:])
		if n > 0 {
			nn += n
		}
		if nn == l {
			return nn, err // complete
		}
		if err != nil {
			return nn, err
		}
		if n == 0 {
			return nn, ErrUnexpectedEOF
		}
	}
	return nn, nil
}

func (this *Stream) Accept(s *Stream) error {
	if this.acceptedFd == -1 {
		return syscall.EAGAIN
	}

	err := s.open(this.acceptedFd, HandleStatusReadable)
	if err != nil {
		panic(err)
		syscall.Close(this.acceptedFd)
	}

	s.Flags |= HandleStatusBound
	this.acceptedFd = -1
	return nil
}

func (this *Stream) open(fd int, flags uint32) error {
	if !(this.io.fd == -1 || this.io.fd == fd) {
		return syscall.EBUSY
	}

	this.Flags |= flags

	if ISet(this.Flags, HandleStatusTCPNodelay) {
		err := tcpNodelay(fd, 1)
		if err != nil {
			return err
		}
	}

	if ISet(this.Flags, HandleStatusTCPKeepalive) {
		err := tcpKeepalive(fd, 1)
		if err != nil {
			return err
		}
	}

	this.io.fd = fd
	return nil
}

func (this *Stream) Close() {
	if this.io.fd != -1 {
		this.io.stop(l, syscall.EPOLLIN)
		syscall.Close(this.io.fd)
	}
	if this.acceptedFd != -1 {
		syscall.Close(this.acceptedFd)
	}
}

func (this *Stream) ReadStart(rcb ReadCallback) error {
	if ISet(this.Flags, HandleStatusClosing) {
		return syscall.EINVAL
	}

	if !ISet(this.Flags, HandleStatusReadable) {
		return syscall.ENOTCONN
	}

	this.Flags |= HandleStatusReading
	this.readCallback = rcb

	this.io.start(this.Loop, syscall.EPOLLIN)
	return nil
}
